測試講了三天,應該可以證明我至少表面上很注重測試對吧?
回來正題,在開發 Airflow 單元測試的時候,有幾件事需要注意的。
Variables 是依賴於 Airflow 的,那麼在單元測試內理論上自然是不存在值的 (畢竟我們並不是真正啟一個 webserver 來運作)。如果你的程式內有直接用到它,那麼就需要 mock 掉。
例如我的主程式內有一段程式碼如下:
def get_sources():
return Variable.get("VAR_KEY1", "default_value")
那我要寫他的測試的時候,有兩種方法可以 mock 它:
@pytest.fixture()
def mock_default(mocker: MockFixture):
mocker.patch.dict("os.environ", AIRFLOW_VAR_VAR_KEY1="variable_value")
def test_something(mock_default):
result = get_sources()
assert result == "vairable_value"
或是像這樣:
def test_something():
with mock.patch.dict("os.environ", AIRFLOW_VAR_VAR_KEY1="variable_value"):
result = get_sources()
assert result == "vairable_value"
兩種方法是類似的,只是你的 mock 是不是大部份的測試都需要用到,是的話建議第一種減少重覆設定的工作。
param 則不必,因為它可以直接塞入 context 或是在 TaskFlow 下做為一般 function 的參數
同樣的,connection 也是在 webserver 內設定的,單元測試內用某個 conn_id 去找該項 connection 時,同樣也是找不到的。做法類似 Variable:
def mock_conn(mocker: MockFixture):
mock_connection = Connection(
conn_type="sqlite",
login="kk_test",
password="kk_pwd",
host="some.host.com",
port=1234,
)
mock_connection_uri = mock_connection.get_uri()
mocker.patch.dict("os.environ", AIRFLOW_CONN_CLICKHOUSE_FROM=mock_connection_uri)
如此一來,當程式內透過 “clickhouse_from” 這個 id 去尋找 connection 時,就會拿到我們 mock 出來的 sqlite 資訊。
但在你這樣做之前,其實可以先考慮試試看一件事:
建立 DAO 層
如果我們在主要的 DAG Task 內,不直接用 conn_id 取得 Connection Hook,而是多建一個 DAO 物件隔離,那麼你未必需要 mock 這段
hook = PostgresHook("connection_id")
dao = SomeDAO(hook)
@task
def query_data(dao: IXXXDao):
dao.query(user="KK")
在主程式內,我們讓 task 只認識 dao 物件的 interface (IXXXDao),如此一來在測試的時候,我們就能 mock 或是直接建一個替代品:
def test_query_data_call_dao_query(mock_dao):
query_data(mock_dao)
mock_dao.assert_called_once_with("KK")
如此一來,我們就不必去在主程式內,測試連線物件的結果。因為測試連線物件的結果幾乎就是在測第三方套件,大多數時候是不必要的。
當然,DAO 內很有可能有一些邏輯,或是 SQL。邏輯要測都是比較簡單的,SQL 就困難一點。有一種做法是在測試時也啟動一個 localhost DB,然後 mock 掉連線的 IP。另一種做法則是強制檢查 SQL 的正確性 (假設你使用 string 合併的話)
總之,多一層 DAO 的好處,就是關注點變小了,你只在意拿到 dao 來取得資料,至於 dao 背後是用 memory 記住、讀 redis、讀本地檔案,你都可以先無視掉,只專注在主程式的邏輯。